{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Step 4: Model operationalization & Deployment\n",
    "\n",
    "In this script, we load the model from the `3_model_building.ipynb` Jupyter notebook and the labeled feature data set constructed in the `2_feature_engineering.ipynb` notebook in order to build the model deployment artifacts. \n",
    "\n",
    "\n",
    "The remainder of this notebook details steps required to deploy and operationalize the model using Azure Machine Learning service SDK."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "## setup our environment by importing required libraries\n",
    "import json\n",
    "import os\n",
    "import shutil\n",
    "import time\n",
    "\n",
    "from pyspark.ml import Pipeline\n",
    "from pyspark.ml.classification import RandomForestClassifier\n",
    "\n",
    "# for creating pipelines and model\n",
    "from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexer\n",
    "\n",
    "# setup the pyspark environment\n",
    "from pyspark.sql import SparkSession\n",
    "\n",
    "# AML SDK libraries\n",
    "from azureml.core import Workspace, Run\n",
    "from azureml.core.model import Model\n",
    "from azureml.core.image import ContainerImage\n",
    "from azureml.core.conda_dependencies import CondaDependencies \n",
    "from azureml.core.webservice import AciWebservice,Webservice\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We need to load the feature data set from memory to construct the operationalization schema."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "features_file = 'featureengineering_files.parquet'\n",
    "target_dir = \"dbfs:/dataset/\"\n",
    "\n",
    "feat_data = spark.read.parquet(os.path.join(target_dir,features_file))\n",
    "feat_data.limit(5).toPandas().head(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The steps that makes up model operationalization are:\n",
    "\n",
    "- The model you trained in notebook 3_model_building\n",
    "- A scoring script to show how to use the model\n",
    "- Conda yml file containing packages need to be installed\n",
    "- A configuration definition object to build the ACI\n",
    "\n",
    "\n",
    "For more details, go to https://docs.microsoft.com/en-us/azure/machine-learning/service/concept-model-management-and-deployment"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here, we copy the model saved in notebook 03_building_model to local directory"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "model_name = 'pdmrfull.model'\n",
    "model_local = \"file:\" + os.getcwd() + \"/\" + model_name\n",
    "model_dir = os.path.join(\"dbfs:/model/\", model_name)\n",
    "dbutils.fs.cp(model_dir, model_local, True)\n",
    "display(dbutils.fs.ls(model_local))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We register the model in the experiment in Azure Machine learning service"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "ws = Workspace.from_config()\n",
    "model_name = 'pdmrfull.model'\n",
    "model = Model.register(model_path= model_name, model_name=model_name , workspace=ws)\n",
    "print(\"Registered:\", model.name)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here we define the conda dependencies used by scoring script."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "conda_env = CondaDependencies.create(conda_packages=['pyspark'])\n",
    "with open(\"conda_env.yml\",\"w\") as f:\n",
    "    f.write(conda_env.serialize_to_string())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here we define the scoring script that will be backed into docker image for prediction serving"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile score.py\n",
    "\n",
    "from azureml.core.model import Model\n",
    "from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexer\n",
    "from pyspark.ml import PipelineModel\n",
    "import pyspark\n",
    "import json\n",
    "\n",
    "def init():\n",
    "    \n",
    "    global pipeline,spark\n",
    "        \n",
    "    spark = pyspark.sql.SparkSession.builder.appName(\"Predictive maintenance service\").getOrCreate()\n",
    "    model_path = Model.get_model_path('pdmrfull.model')\n",
    "    pipeline = PipelineModel.load(model_path)\n",
    "    \n",
    "\n",
    "def run(raw_data):\n",
    "    \n",
    "    try:\n",
    "        sc = spark.sparkContext\n",
    "        input_list = json.loads(raw_data)\n",
    "        input_rdd = sc.parallelize(input_list)\n",
    "        input_df = spark.read.json(input_rdd)\n",
    "        \n",
    "        key_cols =['label_e','machineID','dt_truncated', 'failure','model_encoded','model']\n",
    "        input_features = input_df.columns\n",
    "        \n",
    "        # Remove unseen features by the model during training\n",
    "        input_features = [x for x in input_features if x not in set(key_cols)]\n",
    "        \n",
    "        \n",
    "        va = VectorAssembler(inputCols=(input_features), outputCol='features')\n",
    "        data = va.transform(input_df).select('machineID','features')\n",
    "        score = pipeline.transform(data)\n",
    "        predictions = score.collect()\n",
    "        \n",
    "        preds = [str(x['prediction']) for x in predictions]\n",
    "        result = preds\n",
    "    except Exception as e:\n",
    "        result = str(e)\n",
    "        \n",
    "    return json.dumps({\"result\":result})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "###  Provision ACI for image deployment\n",
    "\n",
    "Note that this may take a couple of minutes for ACI deployment to complete"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "image_config = ContainerImage.image_configuration(runtime= \"spark-py\",\n",
    "                                 execution_script=\"score.py\",\n",
    "                                 conda_file=\"conda_env.yml\")\n",
    "\n",
    "aci_config = AciWebservice.deploy_configuration(cpu_cores = 2, \n",
    "                                               memory_gb = 4, \n",
    "                                               tags = {'type': \"predictive_maintenance\"}, \n",
    "                                               description = \"Predictive maintenance classifier\")\n",
    "\n",
    "\n",
    "\n",
    "aci_service_name = 'pred-maintenance-service'\n",
    "print(aci_service_name)\n",
    "\n",
    "aci_service = Webservice.deploy_from_model(workspace=ws, \n",
    "                                        name=aci_service_name,\n",
    "                                        deployment_config = aci_config,\n",
    "                                        models = [model],\n",
    "                                        image_config = image_config\n",
    "                                          )\n",
    "\n",
    "\n",
    "aci_service.wait_for_deployment(True)\n",
    "print(aci_service.state)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Invoke web service endpoint for prediction\n",
    "\n",
    "First we get a sample test observation that we can score. For this, we can randomly select a single record from the test data.."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "test_sample = (feat_data.sample(False, .8).limit(1))\n",
    "excluded_cols = {'label_e','machineID','dt_truncated','failure','model_encoded','model'}\n",
    "input_features = set(test_sample.columns)- excluded_cols\n",
    "\n",
    "\n",
    "raw_input = test_sample.toJSON().collect()\n",
    "prediction = aci_service.run(json.dumps(raw_input))\n",
    "\n",
    "print(prediction)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Delete ACI service to free up resources"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [],
   "source": [
    "aci_service.delete()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PredictiveMaintenance dlvmjme",
   "language": "python",
   "name": "predictivemaintenance_dlvmjme"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": "3"
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.2"
  },
  "name": "4_operationalization",
  "notebookId": 3296429392325481
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
